/*
 *    Copyright 2022 The DSMS Authors.
 *
 *    Licensed under the Apache License, Version 2.0 (the "License");
 *    you may not use this file except in compliance with the License.
 *    You may obtain a copy of the License at
 *
 *        http://www.apache.org/licenses/LICENSE-2.0
 *
 *    Unless required by applicable law or agreed to in writing, software
 *    distributed under the License is distributed on an "AS IS" BASIS,
 *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *    See the License for the specific language governing permissions and
 *    limitations under the License.
 */

package com.dsms.modules.overview.service.impl;

import com.alibaba.fastjson2.JSONObject;
import com.dsms.common.constant.NodeStatusEnum;
import com.dsms.common.constant.OdsStatusEnum;
import com.dsms.common.constant.PromQL;
import com.dsms.common.constant.ResultCode;
import com.dsms.common.exception.DsmsEngineException;
import com.dsms.common.model.Result;
import com.dsms.common.prometheus.builder.InstantQueryBuilder;
import com.dsms.common.prometheus.builder.QueryBuilderType;
import com.dsms.common.prometheus.builder.RangeQueryBuilder;
import com.dsms.common.prometheus.converter.result.DefaultQueryResult;
import com.dsms.common.prometheus.converter.result.MatrixData;
import com.dsms.common.prometheus.converter.result.QueryResultItemValue;
import com.dsms.common.prometheus.converter.result.VectorData;
import com.dsms.common.util.PrometheusUtils;
import com.dsms.dfsbroker.cluster.api.ClusterApi;
import com.dsms.dfsbroker.cluster.model.remote.StatusResult;
import com.dsms.dfsbroker.cluster.model.remote.StatusResult.HealthCheckDTO;
import com.dsms.dfsbroker.node.api.NodeApi;
import com.dsms.dfsbroker.node.model.remote.ResponseMon;
import com.dsms.dfsbroker.osd.osd.api.OsdApi;
import com.dsms.dfsbroker.osd.osd.model.remote.OSDStatusResult;
import com.dsms.modules.monitor.model.dto.Bps;
import com.dsms.modules.monitor.model.dto.ClusterStatusDTO;
import com.dsms.modules.monitor.model.dto.Ops;
import com.dsms.modules.overview.model.dto.OverviewDTO;
import com.dsms.modules.overview.service.IOverviewService;
import com.dsms.modules.util.PrometheusUtil;
import com.dsms.modules.util.RemoteCallUtil;
import java.util.Collection;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;

import java.net.URI;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

@Service
@Slf4j
public class IOverviewServiceImpl implements IOverviewService {

    private final ClusterApi clusterApi;
    private final NodeApi nodeApi;
    private final OsdApi osdApi;

    @Autowired
    public IOverviewServiceImpl (ClusterApi clusterApi, NodeApi nodeApi, OsdApi osdApi){
        this.clusterApi = clusterApi;
        this.nodeApi = nodeApi;
        this.osdApi = osdApi;
    }



    @Override
    public Result<OverviewDTO> getOverView() {
        StatusResult clusterStatus;
        List<OSDStatusResult> osdStatusResults;
        try {
            clusterStatus = clusterApi.getStatus(RemoteCallUtil.generateRemoteRequest());
            osdStatusResults = osdApi.listOsdStatus(RemoteCallUtil.generateRemoteRequest());
        } catch (Throwable e) {
            throw new DsmsEngineException(ResultCode.MON_GETCLUSTERSTATUS_ERROR);
        }
        //1.get cluster health status
        String healthStatus = clusterStatus.getHealth().getStatus();

        //1.1 get cluster health detail
        Collection<JSONObject> values = clusterStatus.getHealth().getChecks().values();
        String healthStatusDetail = values.stream()
                                          .map(object -> object.to(HealthCheckDTO.class))
                                          .map(healthCheckDTO -> healthCheckDTO.getSummary().getMessage())
                                          .collect(Collectors.joining("\n"));

        //2.get osd which status both 'up' and 'in'
        List<OSDStatusResult> normalOsds = osdStatusResults.stream()
                .filter(item -> item.getState().contains(OdsStatusEnum.EXISTS.getStatus()) && item.getState().contains(OdsStatusEnum.UP.getStatus()) && item.getWeight() > 0)
                .collect(Collectors.toList());
        ClusterStatusDTO.OsdDTO osd = new ClusterStatusDTO.OsdDTO(clusterStatus.getOsdmap().getNumOsds(), normalOsds.size());

        //construct QueryBuilder
        InstantQueryBuilder instantQueryBuilder = PrometheusUtil.getQueryBuilder(QueryBuilderType.InstantQuery);
        RangeQueryBuilder rangeQueryBuilder = PrometheusUtil.getQueryBuilder(QueryBuilderType.RangeQuery);

        //3.get ops&bps and the specified time is within the last hour
        long startTimeStamp = LocalDateTime.now().minusHours(1).toEpochSecond(ZoneOffset.ofHours(8));
        long endTimeStamp = LocalDateTime.now().toEpochSecond(ZoneOffset.ofHours(8));

        //3.1 ops
        URI clusterWriteOpsUri = rangeQueryBuilder.build(PromQL.CLUSTER_WRITE_OPS, startTimeStamp, endTimeStamp);
        DefaultQueryResult<MatrixData> clusterWriteOps = PrometheusUtils.convertQueryResultString(PrometheusUtil.generatePrometheusTemplate().getForObject(clusterWriteOpsUri, String.class));
        URI clusterReadOpsUri = rangeQueryBuilder.build(PromQL.CLUSTER_READ_OPS, startTimeStamp, endTimeStamp);
        DefaultQueryResult<MatrixData> clusterReadOps = PrometheusUtils.convertQueryResultString(PrometheusUtil.generatePrometheusTemplate().getForObject(clusterReadOpsUri, String.class));
        Ops ops = new Ops(clusterReadOps.getQueryResult(), clusterWriteOps.getQueryResult());

        //3.2 bps
        URI clusterWriteBpsUri = rangeQueryBuilder.build(PromQL.CLUSTER_WRITE_BPS, startTimeStamp, endTimeStamp);
        DefaultQueryResult<MatrixData> clusterWriteBps = PrometheusUtils.convertQueryResultString(PrometheusUtil.generatePrometheusTemplate().getForObject(clusterWriteBpsUri, String.class));
        URI clusterReadBpsUri = rangeQueryBuilder.build(PromQL.CLUSTER_READ_BPS, startTimeStamp, endTimeStamp);
        DefaultQueryResult<MatrixData> clusterReadBps = PrometheusUtils.convertQueryResultString(PrometheusUtil.generatePrometheusTemplate().getForObject(clusterReadBpsUri, String.class));
        Bps bps = new Bps(clusterReadBps.getQueryResult(), clusterWriteBps.getQueryResult());

        //4.get node info
        List<OverviewDTO.NodeDTO> nodeList = new ArrayList<>();
        List<ResponseMon> monList = null;
        try {
            monList = nodeApi.getNodeList(RemoteCallUtil.generateRemoteRequest());
        } catch (Throwable e) {
            throw new DsmsEngineException(e, ResultCode.NODE_LISTNODE_ERROR);
        }
        if (!ObjectUtils.isEmpty(monList)) {
            //onlineNodes contains nodes with normal osd, mon, or mgr leader,a node is online as long as it meets one of these three conditions
            List<String> onlineNodes = new ArrayList<>();
            List<String> quorumNames = clusterStatus.getQuorumNames();
            String mgrLeader = clusterStatus.getMgrmap().getLeader();
            List<String> osdUpNodes = osdStatusResults.stream()
                    .filter(item -> item.getState().contains(OdsStatusEnum.EXISTS.getStatus()) && item.getState().contains(OdsStatusEnum.UP.getStatus()))
                    .map(OSDStatusResult::getServer)
                    .distinct()
                    .collect(Collectors.toList());
            onlineNodes.addAll(osdUpNodes);
            onlineNodes.addAll(quorumNames);
            osdUpNodes.add(mgrLeader);
            onlineNodes = onlineNodes.stream().filter(Objects::nonNull).distinct().collect(Collectors.toList());

            for (ResponseMon responseMon : monList) {
                String nodeName = responseMon.getName();
                String ipAddress = responseMon.getAddr();
                if (ObjectUtils.isEmpty(ipAddress)) {
                    continue;
                }
                //verify nodeStatus
                String nodeIp = ipAddress.substring(0, ipAddress.indexOf(":"));
                int nodeStatus = NodeStatusEnum.DOWN.getStatus();
                //set nodeStatus
                if (onlineNodes.contains(nodeName)) {
                    nodeStatus = NodeStatusEnum.UP.getStatus();
                }

                //get node cpu usage
                URI cpuUsageUri = instantQueryBuilder.withQuery(PrometheusUtils.getFormatPromQL(PromQL.NODE_CPU_USAGE_TOTAL, new String[]{nodeName, nodeName})).build();
                DefaultQueryResult<VectorData> cpuUsageResponse = PrometheusUtils.convertQueryResultString(PrometheusUtil.generatePrometheusTemplate().getForObject(cpuUsageUri, String.class));
                QueryResultItemValue cpuUsage = cpuUsageResponse.getQueryResult();

                Double cpuUsagePercent = Optional.ofNullable(cpuUsage)
                        .map(QueryResultItemValue::getPercentageValue)
                        .orElse(null);

                //get node memory usage
                URI memoryUsageUri = instantQueryBuilder.withQuery(PrometheusUtils.getFormatPromQL(PromQL.NODE_MEMORY_USED_RATE, new String[]{nodeName, nodeName, nodeName, nodeName, nodeName, nodeName})).build();
                DefaultQueryResult<VectorData> memoryUsageResponse = PrometheusUtils.convertQueryResultString(PrometheusUtil.generatePrometheusTemplate().getForObject(memoryUsageUri, String.class));
                QueryResultItemValue memoryUsage = memoryUsageResponse.getQueryResult();

                Double memoryUsagePercent = Optional.ofNullable(memoryUsage)
                        .map(QueryResultItemValue::getPercentageValue)
                        .orElse(null);

                //construct node
                OverviewDTO.NodeDTO node = new OverviewDTO.NodeDTO(nodeName, nodeIp, nodeStatus, cpuUsagePercent, memoryUsagePercent);
                nodeList.add(node);
            }
        }

        //5.get cluster capacity usage
        URI capacityUsageUri = instantQueryBuilder.withQuery(PromQL.CLUSTER_CAPACITY_USAGE).build();
        DefaultQueryResult<VectorData> capacityUsageResponse = PrometheusUtils.convertQueryResultString(PrometheusUtil.generatePrometheusTemplate().getForObject(capacityUsageUri, String.class));
        QueryResultItemValue capacityUsage = capacityUsageResponse.getQueryResult();

        //When there is no osd in the cluster, it returns null; when there is an osd, it returns a percentage value
        Double capacityUsagePercent = Optional.ofNullable(capacityUsage)
                .map(QueryResultItemValue::getPercentageValue)
                .orElse(null);

        //construct return object
        OverviewDTO overviewDTO = new OverviewDTO(healthStatus, healthStatusDetail, osd, nodeList, ops, bps, capacityUsagePercent);
        return Result.OK(overviewDTO);
    }
}
